View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.report.ConsoleStream;
23  import org.junit.runner.Description;
24  import org.junit.runners.model.RunnerScheduler;
25  
26  import java.io.ByteArrayOutputStream;
27  import java.io.PrintStream;
28  import java.util.Collection;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.CopyOnWriteArraySet;
32  import java.util.concurrent.RejectedExecutionException;
33  import java.util.concurrent.RejectedExecutionHandler;
34  import java.util.concurrent.ThreadPoolExecutor;
35  
36  /**
37   * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
38   * a master scheduler can shutdown slaves.
39   * <p/>
40   * The scheduler objects should be first created (and wired) and set in runners
41   * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
42   * <p/>
43   * A new instance of scheduling strategy should be passed to the constructor of this scheduler.
44   *
45   * @author Tibor Digana (tibor17)
46   * @since 2.16
47   */
48  public class Scheduler
49      implements RunnerScheduler
50  {
51      private final Balancer balancer;
52  
53      private final SchedulingStrategy strategy;
54  
55      private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();
56  
57      private final Description description;
58  
59      private final ConsoleStream logger;
60  
61      private volatile boolean shutdown = false;
62  
63      private volatile boolean started = false;
64  
65      private volatile boolean finished = false;
66  
67      private volatile Controller masterController;
68  
69      /**
70       * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
71       * <p/>
72       * You can use it with one infinite thread pool shared in strategies across all
73       * suites, class runners, etc.
74       */
75      public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy )
76      {
77          this( logger, description, strategy, -1 );
78      }
79  
80      /**
81       * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
82       * <p/>
83       * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
84       * {@link Scheduler(ConsoleStream, org.junit.runner.Description, Scheduler, SchedulingStrategy)}
85       * or {@link Scheduler(ConsoleStream, org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
86       *
87       * @param logger current logger implementation
88       * @param description description of current runner
89       * @param strategy    scheduling strategy with a shared thread pool
90       * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
91       * @throws NullPointerException if null <tt>strategy</tt>
92       */
93      public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, int concurrency )
94      {
95          this( logger, description, strategy, BalancerFactory.createBalancer( concurrency ) );
96      }
97  
98      /**
99       * New instances should be used by schedulers with limited concurrency by <tt>balancer</tt>
100      * against other groups of schedulers. The schedulers share one pool.
101      * <p/>
102      * Unlike in {@link Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy, int)} which was
103      * limiting the <tt>concurrency</tt> of children of a runner where this scheduler was set, <em>this</em>
104      * <tt>balancer</tt> is limiting the concurrency of all children in runners having schedulers created by this
105      * constructor.
106      *
107      * @param logger current logger implementation
108      * @param description description of current runner
109      * @param strategy    scheduling strategy which may share threads with other strategy
110      * @param balancer    determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
111      * @throws NullPointerException if null <tt>strategy</tt> or <tt>balancer</tt>
112      */
113     public Scheduler( ConsoleStream logger, Description description, SchedulingStrategy strategy, Balancer balancer )
114     {
115         strategy.setDefaultShutdownHandler( newShutdownHandler() );
116         this.logger = logger;
117         this.description = description;
118         this.strategy = strategy;
119         this.balancer = balancer;
120         masterController = null;
121     }
122 
123     /**
124      * Can be used by e.g. a runner having parallel classes in use case with parallel
125      * suites, classes and methods sharing the same thread pool.
126      *
127      * @param logger current logger implementation
128      * @param description     description of current runner
129      * @param masterScheduler scheduler sharing own threads with this slave
130      * @param strategy        scheduling strategy for this scheduler
131      * @param balancer        determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
132      * @throws NullPointerException if null <tt>masterScheduler</tt>, <tt>strategy</tt> or <tt>balancer</tt>
133      */
134     public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
135                       SchedulingStrategy strategy, Balancer balancer )
136     {
137         this( logger, description, strategy, balancer );
138         strategy.setDefaultShutdownHandler( newShutdownHandler() );
139         masterScheduler.register( this );
140     }
141 
142     /**
143      * @param masterScheduler a reference to
144      * {@link Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy, int)}
145      *                        or {@link Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy)}
146      * @see Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy)
147      * @see Scheduler(ConsoleStream, org.junit.runner.Description, SchedulingStrategy, int)
148      */
149     public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
150                       SchedulingStrategy strategy, int concurrency )
151     {
152         this( logger, description, strategy, concurrency );
153         strategy.setDefaultShutdownHandler( newShutdownHandler() );
154         masterScheduler.register( this );
155     }
156 
157     /**
158      * Should be used with individual pools on suites, classes and methods, see
159      * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
160      * <p/>
161      * Cached thread pool is infinite and can be always shared.
162      */
163     public Scheduler( ConsoleStream logger, Description description, Scheduler masterScheduler,
164                       SchedulingStrategy strategy )
165     {
166         this( logger, description, masterScheduler, strategy, 0 );
167     }
168 
169     private void setController( Controller masterController )
170     {
171         if ( masterController == null )
172         {
173             throw new NullPointerException( "null ExecutionController" );
174         }
175         this.masterController = masterController;
176     }
177 
178     /**
179      * @param slave a slave scheduler to register
180      * @return <tt>true</tt> if successfully registered the <tt>slave</tt>.
181      */
182     private boolean register( Scheduler slave )
183     {
184         boolean canRegister = slave != null && slave != this;
185         if ( canRegister )
186         {
187             Controller controller = new Controller( slave );
188             canRegister = !slaves.contains( controller );
189             if ( canRegister )
190             {
191                 slaves.add( controller );
192                 slave.setController( controller );
193             }
194         }
195         return canRegister;
196     }
197 
198     /**
199      * @return <tt>true</tt> if new tasks can be scheduled.
200      */
201     private boolean canSchedule()
202     {
203         return !shutdown && ( masterController == null || masterController.canSchedule() );
204     }
205 
206     protected void logQuietly( Throwable t )
207     {
208         ByteArrayOutputStream out = new ByteArrayOutputStream();
209         PrintStream stream = new PrintStream( out );
210         try
211         {
212             t.printStackTrace( stream );
213         }
214         finally
215         {
216             stream.close();
217         }
218         logger.println( out.toString() );
219     }
220 
221     protected void logQuietly( String msg )
222     {
223         logger.println( msg );
224     }
225 
226     /**
227      * Attempts to stop all actively executing tasks and immediately returns a collection
228      * of descriptions of those tasks which have started prior to this call.
229      * <p/>
230      * This scheduler and other registered schedulers will stop, see {@link #register(Scheduler)}.
231      * If <tt>shutdownNow</tt> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
232      *
233      * @param stopNow if <tt>true</tt> interrupts waiting test methods
234      * @return collection of descriptions started before shutting down
235      */
236     protected ShutdownResult describeStopped( boolean stopNow )
237     {
238         Collection<Description> executedTests = new ConcurrentLinkedQueue<Description>();
239         Collection<Description> incompleteTests = new ConcurrentLinkedQueue<Description>();
240         stop( executedTests, incompleteTests, false, stopNow );
241         return new ShutdownResult( executedTests, incompleteTests );
242     }
243 
244     /**
245      * Stop/Shutdown/Interrupt scheduler and its children (if any).
246      *
247      * @param executedTests       Started tests which have finished normally or abruptly till called this method.
248      * @param incompleteTests     Started tests which have finished incomplete due to shutdown.
249      * @param tryCancelFutures    Useful to set to {@code false} if a timeout is specified in plugin config.
250      *                            When the runner of
251      *                            {@link ParallelComputer#getSuite(org.junit.runners.model.RunnerBuilder, Class[])}
252      *                            is finished in
253      *                            {@link org.junit.runners.Suite#run(org.junit.runner.notification.RunNotifier)}
254      *                            all the thread-pools created by {@link ParallelComputerBuilder.PC} are already dead.
255      *                            See the unit test <em>ParallelComputerBuilder#timeoutAndForcedShutdown()</em>.
256      * @param stopNow             Interrupting tests by {@link java.util.concurrent.ExecutorService#shutdownNow()} or
257      *                            {@link java.util.concurrent.Future#cancel(boolean) Future#cancel(true)} or
258      *                            {@link Thread#interrupt()}.
259      */
260     private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
261                        boolean tryCancelFutures, boolean stopNow )
262     {
263         shutdown = true;
264         try
265         {
266             if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
267             {
268                 if ( executedTests != null )
269                 {
270                     executedTests.add( description );
271                 }
272 
273                 if ( incompleteTests != null && !finished )
274                 {
275                     incompleteTests.add( description );
276                 }
277             }
278 
279             for ( Controller slave : slaves )
280             {
281                 slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
282             }
283         }
284         finally
285         {
286             try
287             {
288                 balancer.releaseAllPermits();
289             }
290             finally
291             {
292                 if ( stopNow )
293                 {
294                     strategy.stopNow();
295                 }
296                 else if ( tryCancelFutures )
297                 {
298                     strategy.stop();
299                 }
300                 else
301                 {
302                     strategy.disable();
303                 }
304             }
305         }
306     }
307 
308     protected boolean shutdownThreadPoolsAwaitingKilled()
309     {
310         if ( masterController == null )
311         {
312             stop( null, null, true, false );
313             boolean isNotInterrupted = true;
314             if ( strategy != null )
315             {
316                 isNotInterrupted = strategy.destroy();
317             }
318             for ( Controller slave : slaves )
319             {
320                 isNotInterrupted &= slave.destroy();
321             }
322             return isNotInterrupted;
323         }
324         else
325         {
326             throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
327         }
328     }
329 
330     protected void beforeExecute()
331     {
332     }
333 
334     protected void afterExecute()
335     {
336     }
337 
338     public void schedule( Runnable childStatement )
339     {
340         if ( childStatement == null )
341         {
342             logQuietly( "cannot schedule null" );
343         }
344         else if ( canSchedule() && strategy.canSchedule() )
345         {
346             try
347             {
348                 boolean isNotInterrupted = balancer.acquirePermit();
349                 if ( isNotInterrupted && !shutdown )
350                 {
351                     Runnable task = wrapTask( childStatement );
352                     strategy.schedule( task );
353                     started = true;
354                 }
355             }
356             catch ( RejectedExecutionException e )
357             {
358                 stop( null, null, true, false );
359             }
360             catch ( Throwable t )
361             {
362                 balancer.releasePermit();
363                 logQuietly( t );
364             }
365         }
366     }
367 
368     public void finished()
369     {
370         try
371         {
372             strategy.finished();
373         }
374         catch ( InterruptedException e )
375         {
376             logQuietly( e );
377         }
378         finally
379         {
380             finished = true;
381         }
382     }
383 
384     private Runnable wrapTask( final Runnable task )
385     {
386         return new Runnable()
387         {
388             public void run()
389             {
390                 try
391                 {
392                     beforeExecute();
393                     task.run();
394                 }
395                 finally
396                 {
397                     try
398                     {
399                         afterExecute();
400                     }
401                     finally
402                     {
403                         balancer.releasePermit();
404                     }
405                 }
406             }
407         };
408     }
409 
410     protected ShutdownHandler newShutdownHandler()
411     {
412         return new ShutdownHandler();
413     }
414 
415     /**
416      * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
417      */
418     private final class Controller
419     {
420         private final Scheduler slave;
421 
422         private Controller( Scheduler slave )
423         {
424             this.slave = slave;
425         }
426 
427         /**
428          * @return <tt>true</tt> if new children can be scheduled.
429          */
430         boolean canSchedule()
431         {
432             return Scheduler.this.canSchedule();
433         }
434 
435         void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
436                    boolean tryCancelFutures, boolean shutdownNow )
437         {
438             slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
439         }
440 
441         /**
442          * @see org.apache.maven.surefire.junitcore.pc.Destroyable#destroy()
443          */
444         boolean destroy()
445         {
446             return slave.strategy.destroy();
447         }
448 
449         @Override
450         public int hashCode()
451         {
452             return slave.hashCode();
453         }
454 
455         @Override
456         public boolean equals( Object o )
457         {
458             return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
459         }
460     }
461 
462     /**
463      * There is a way to shutdown the hierarchy of schedulers. You can do it in master scheduler via
464      * {@link #shutdownThreadPoolsAwaitingKilled()} which kills the current master and children recursively.
465      * If alternatively a shared {@link java.util.concurrent.ExecutorService} used by the master and children
466      * schedulers is shutdown from outside, then the {@link ShutdownHandler} is a hook calling current
467      * {@link #describeStopped(boolean)}. The method {@link #describeStopped(boolean)} is again shutting down children
468      * schedulers recursively as well.
469      */
470     public class ShutdownHandler
471         implements RejectedExecutionHandler
472     {
473         private volatile RejectedExecutionHandler poolHandler;
474 
475         protected ShutdownHandler()
476         {
477             poolHandler = null;
478         }
479 
480         public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
481         {
482             this.poolHandler = poolHandler;
483         }
484 
485         public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
486         {
487             if ( executor.isShutdown() )
488             {
489                 Scheduler.this.stop( null, null, true, false );
490             }
491             final RejectedExecutionHandler poolHandler = this.poolHandler;
492             if ( poolHandler != null )
493             {
494                 poolHandler.rejectedExecution( r, executor );
495             }
496         }
497     }
498 }